Tutorial 4: Lock free programming in C++

In this tutorial, we discuss multi-consumer, multi-producer lock-free queues and briefly evaluate under which situations a lock-free queue might be more suitable than a lock-based one.

1 Introduction

We’ll be converting the fine grained queue from Lecture 5 into a lock free version. If you’re not familiar with the design, please refer to the Lecture 5 slides.

2 Initial attempt

Let’s consider the operations each consumer and producer will need to do, step by step.

2.1 Producers

Producers’ enqueue implementation:

  1. Create a new Node (new_dummy).
  2. Get current work_node by reading m_queue_back
  3. Set the new job in the work_node.
  4. Point work_node at new_dummy by setting next pointer. This also simultaneously converts the work node to a “real” job-holding node.
  5. Also update m_queue_back so other producers know where the new end of the queue is.

We see that we need to perform three writes to publicly accessible memory, in steps C, D, and E. If we naively make this lock free, even if we have no “data races” in the technical C++ sense (e.g. by using atomic everywhere), we may still have race conditions.

For example, a race condition happens when two producers attempt to place their job in the same dummy node, thus overwriting the other producer’s job.

This creates at least two potential problems:

  1. A producer may get in another producer’s way, eg. by overwriting their job.
  2. A producer may not be correctly synchronised with consumers, causing them to read an invalid state.

We can solve problem (1) by choosing (E) to be the “definitive” source of truth for which producer is allowed to modify which nodes.

// queue field:
std::atomic<Node*> m_queue_back;

// doesn't work:
{
    Node* new_dummy = new Node();
    Node* work_node = m_queue_back;
    m_queue_back = new_dummy;
    // ... modify work_node
}

// works:
{
    Node* new_dummy = new Node();
    Node* work_node = m_queue_back.exchange(new_dummy);
    // ... modify work_node
}

In other words, we combine (B) and (E) into one atomic step. In doing so, each producer will now get their own Node to work with, and we avoid them stepping on each other’s toes.

To solve problem (2), we need to release our writes (the job) to the consumer in a way that they can reliably acquire. To do this, we will use release-acquire semantics that we have discussed previously. We will use a node’s next pointer as the shared memory location.

If we perform a release-write to work_node->next, any consumers that read that node’s ->next will synchronise-with the producer. If they read a nullptr, then there isn’t a valid job (it’s a dummy).

Note that the invariant of this data structure is that the last node (m_queue_back) always points to a dummy node. All non-dummy nodes will have a non-null next pointer, and only the dummy node has a null next pointer.

This results in the following algorithm:

  1. [A] Create a new Node (new_dummy). This will be the queue’s new dummy node after we are done.
  2. [B+E] Simultaneously (atomically) get the current dummy node, and also update the queue to point to our new dummy node: m_queue_back.exchange(new_dummy)
  3. [C] Set our job the node we got from the above step (work_node).
  4. [D] Point our work_node at our new_dummy by release-storing its next pointer. This also simultaneously converts work_node to a “real” job-holding node, and also serves to synchronise-with the corresponding consumer that reads this pointer.

We can show that this algorithm works to solve both problems.

2.2 Consumers

Next, we can move to the consumer side of the queue. Since we are writing a lock-free queue, we will not implement a blocking dequeue, only a non-blocking try_pop.

At a high level, these are the steps that we need to do:

  1. Read m_queue_front for the front of the queue.
  2. Check whether the node is a dummy or not, by reading its next pointer.
    1. If it’s a dummy, then we’re at the end of the queue, so we return an empty result.
    2. Otherwise, continue – there are some jobs in the queue.
  3. Update m_queue_front to point at the next node (ie. m_queue_front = m_queue_front->next)
  4. Return the job in the node we just removed from the queue (ie. return old_front->job)

Just like the case of producers, there are at least two potential race conditions, even if all the publicly accessible memory is atomic:

  1. A consumer may get in another consumer’s way, eg. by consuming a node meant for another consumer
  2. A consumer may not be correctly synchronised with producers, causing them to read an invalid state.

From the previous section, we know that we can solve problem (2) by synchronising the producer with us before we read the job, so we know we have to perform step (B) with an acquire memory ordering.

What about problem (1)?

We solved it previously by ensuring no two producers operate on the same node by using an atomic exchange to perform the read and update operations at the same time. Unfortunately, we cannot do that so easily here:

// The following cannot be made atomic
Node* node_to_consume = m_queue_front.exchange(
    m_queue_front->next // There is an entirely separate atomic load here
                        // ... furthermore we only want to exchange if it's not null
                        // so this code doesn't even do what we need it to do
);

The main problem is that we need to only perform the exchange conditionally, and we need to load the new value we want to exchange from the pointer we are trying to exchange itself…

As you can see, this is not possible.

2.3 The Compare-And-Swap Pattern

In order to solve this problem, we must use the compare-and-swap pattern, sometimes known as the compare-exchange pattern. This is another atomic operation, similar to the existing ones we’ve seen so far (load, store, fetch_add, etc.).

As its name might suggest, it first performs a comparison on the memory location, checking if the current value is the same as our expected value. If it isn’t, then we give up. If it is, then we store our new value. In either case, we get the old/current value back. All of this is done in one atomic instruction. As you might be able to tell, this is a very powerful primitive.

The best way to reason about compare-and-swap is that it presents the operation of “set X from OLD to NEW”, rather than simply “set X to NEW”. This statement doesn’t make sense if X was not OLD before setting, and so the compare-and-swap should fail.

In this way, we are performing the exchange only if nobody else has done anything to it in the meantime — ensuring that our idea of reality and the actual reality match before we actually write anything.

2.3.1 Implementing fetch_add with the CAS Loop Pattern

The true power of compare-and-swap becomes apparent when we put it in a loop. Indeed, most algorithms that use compare-and-swap typically use them in a loop, since we (usually) have to handle the failure case (ie. when the current value is not our expected value).

This is how we might implement atomic fetch_add using compare-and-swap:

int fetch_add(std::atomic<int>& value, int add)
{
    // first, load the old value.
    int old_value = value.load();
    while(true)
    {
        // this is the new value which we want to store.
        int new_value = old_value + 5;

        // set value *FROM old_value* TO new_value.
        // if it succeeded (returns true), we're done.
        if(value.compare_exchange_weak(old_value, new_value))
            return old_value;

        // here, we failed -- someone else changed `value` so that
        // it was no longer `old_value`.

        // `old_value` is taken by reference, so we get the current
        // value back even on failure (and so we don't need to load it ourselves).
        // go back to the top of the loop and retry, with our refreshed `old_value`.
    }
}
Snippet 1: A possible implementation of atomic<int>::fetch_add using a compare-and-swap loop

Of course, the real implementation of fetch_add is often just a single instruction on most architectures.

As mentioned in the comments, one thing to keep in mind is that compare_exchange takes in the old_value as a non-const reference. Whether or not the CAS succeeded, we still would have had to load value (to do the compare), and so we conveniently get that old value returned back to us.

2.3.2 Strong vs Weak compare-and-swap

One peculiarity you might have noticed is the appearance of _weak in compare_exchange — that implies the existence of a _strong variant, right? Indeed, you are correct.

You can expand the box below to learn more about why these two variants exist, but the key point is that the weak version can spuriously fail. That is, it can fail to perform the exchange even though the current value matched our expected value. The strong version is not susceptible to such weaknesses.

In general, the guidance is that the _strong version can be more expensive. If you are already using CAS in a loop and each iteration of the loop is relatively cheap, then you should probably use the _weak version. In our case, the only update we had to do was a simple + 5, we used _weak.

On the other hand, if updating the expected state on failure requires heavy computation, or the CAS is not even being used in a loop, then you should prefer the _strong version.

Read more from Raymond Chen.

Why can compare-and-swap fail spuriously, anyway?

Of course, we would prefer if compare_exchange couldn’t fail spuriously, at all. One of the main reasons that a weak version exists is due to hardware architectures.

On x86, we have the cmpxchg instruction, which is guaranteed by the architecture to not fail spuriously. However, other architectures use a different primitive, often called load-linked/store-conditional (LL/SC), to implement compare-and-swap.

The load of the current value is the “load-linked”; the CPU automatically handles “breaking” the LL/SC relationship (making the conditional store fail) if anybody updates the load-linked location, thus giving us the compare-and-swap behaviour. Unfortunately, some CPU architectures will invalidate the LL/SC even if nobody stored to the location. Common cases include:

  1. A context switch
  2. Another load-linked instruction
  3. Another store instruction
  4. A cosmic ray hit the CPU

2.4 Implementing try_pop with a CAS loop

Now that we know how to use compare-and-swap loops to carry out complex updates, we can use it to implement try_pop correctly. The key idea behind our implementation is that if any other thread managed to take a job while we were trying to, then we fail and try again — instead of leaving the queue in a broken state.

This gives us the following high-level algorithm:

  1. Read old_front = m_queue_front for the front of the queue.
  2. Loop:
    1. Check whether the node is a dummy or not by reading the next pointer.
      1. If it’s a dummy, then we’re at the end of the queue, so we return nullopt.
      2. Otherwise, continue
    2. Update m_queue_front to point at the next node with CAS: m_queue_front.compare_exchange_weak(old_front, next)
    3. If successful, break out of the loop
  3. Return the job in the node we just removed from the queue: return old_front->job

2.5 Demo 1: Putting the pieces together

We now translate our partial snippets into actual C++, along with the correct memory orders.

Godbolt link  |  Fsmbolt link

Code
class JobQueue1
{
    // alias for std::memory_order
    using stdmo = std::memory_order;

    // A node is a dummy node if its next pointer is set to QUEUE_END
    // We use the next ptr to establish the synchronizes-with relationship
    // next is in charge of "releasing" job
    struct Node
    {
        std::atomic<Node*> next = QUEUE_END;
        Job job;
    };

    static inline Node* const QUEUE_END = nullptr;

    // Avoid false sharing with alignment
    alignas(64) std::atomic<Node*> m_queue_back;  // producer end
    alignas(64) std::atomic<Node*> m_queue_front; // consumer end

public:
    // Queue starts with a dummy node
    JobQueue1() //
        : m_queue_back(new Node())
        , m_queue_front(m_queue_back.load(stdmo::relaxed))
    {
    }

    ~JobQueue1()
    {
        // Assumption: no other threads are accessing the job queue
        Node* cur_node = m_queue_front.load(stdmo::relaxed);
        while(cur_node != QUEUE_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }
    }

public:
    void push(Job job)
    {
        Node* new_dummy = new Node();

        // Use m_queue_back.exchange to establish a global order of all enqueues
        //
        // We use acq_rel because:
        // - Release initialisation of `new_dummy`
        // - Similarly acquire initialisation of `work_node`
        // initialisation = what the Node constructor does
        Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);

        // now, work_node is unique for every producer (push)

        // First write the job
        work_node->job = job;

        // Now "release" the job to consumers,
        // and also append to LL at the same time
        work_node->next.store(new_dummy, stdmo::release);
    }

    std::optional<Job> try_pop()
    {
        // Splice node from the front of queue, but only if it's not dummy
        // Successfully splicing a node establishes global order of pops

        Node* old_front = m_queue_front.load(stdmo::relaxed);
        while(true)
        {
            // "Acquire" job if it exists
            // Also use next pointer to know what to update m_queue_front to
            Node* new_front = old_front->next.load(stdmo::acquire);
            if(new_front == QUEUE_END)
            {
                // Observed dummy node, so we can abort as the queue is empty
                // (or close to it)
                return std::nullopt;
            }

            // for now, we use relaxed.
            if(m_queue_front.compare_exchange_weak(old_front, //
                   new_front,                                 //
                   stdmo::relaxed))
            {
                // Node now belongs to us
                break;
            }

            // We couldn't update m_queue_front, so someone else successfully
            // poped a node. We'll just loop.
        }

        Job job = old_front->job;
        delete old_front;

        return job;
    }
};
Snippet 2: Initial attempt of our lock-free queue

Let’s test it out with a configuration of 2 producers and 3 consumers!

We’ll have the 2 producers put in a bunch of jobs, and the consumers will each read out a some of them and sum their ids in a consumer-specific partial sum. main will wait for all threads to complete, and then sum the partial sums.

Code
#include <thread>

int main()
{
    JobQueue1 queue;

    auto producer1 = std::thread([&queue]() {
        for(int i = 1; i <= 150000; i++)
            queue.push(Job { i, i });
    });

    auto producer2 = std::thread([&queue]() {
        for(int i = 150001; i <= 300000; i++)
            queue.push(Job { i, i });
    });

    // 3 partial sums
    int sum1 = 0;
    int sum2 = 0;
    int sum3 = 0;

    auto consumer_fn = [&queue](int& sum) {
        // Sum 100000 things
        for(int i = 0; i < 100000; i++)
        {
            while(true)
            {
                std::optional<Job> job = queue.try_pop();
                if(job)
                {
                    sum += job->id;
                    break;
                }
            }
        }
    };

    auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
    auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
    auto consumer3 = std::thread(consumer_fn, std::ref(sum3));

    producer1.join();
    producer2.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();

    printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}
Snippet 3: Some testing code for our queue, which we will reuse for later

Running it normally might give you an answer, and that answer might even be the correct answer. But if we run with ThreadSanitizer enabled, we get a lot of warnings, not limited to data races!

(Let this be a warning to always test with ThreadSanitizer!)

3 Problem #1: the ABA problem

When using compare-and-swap loops, one thing that must be immediately addressed is the ABA problem.

We made the assumption that because we perform steps A+B+C atomically with a CAS loop, we only succeed if the value of m_queue_front was still old_front (the expected value) at the time of the write.

But if old_front can be set to another value, and then set back to the same value again in time for the CAS operation to be performed, the CAS would succeed! This is because previously freed memory addresses may have been allocated to subquent calls to new Node().

This is where the name comes from: a value initially has value A, is set to B, and then back to A.

This is not always a problem. For example, our fetch_add implementation above really does work, as we don’t rely on the property that A actually never changed in the loop. Instead all we really care about is that the new value is 5 higher than the old value.

In our case, we DO care that the queue has not changed, since it’s important that new_front really does refer to the second node in the queue. We only have this guarantee if the queue really did not change between the start and end of the loop.

3.1 Illustration of ABA problem

Here’s an example of the ABA problem causing things to break.

First, a consumer tries to pop a node, and sees this state before entering the CAS loop:

ABA: initial state

Now, the OS is mean to our consumer and puts it to sleep. In the meantime, the queue goes through many pushes and pops, allocating and deallocating nodes as it goes. It still holds a pointer to (what it thinks is) the old_front, address 0x20.

However, while it may point to a node, the identity of the node is now different! Previously we had Job{5, 5} — but now it’s Job{100, 100}, a completely different node that just happened to have the same address.

On the other hand, new_front (the second node in the queue when the consumer last checked) might have already been deallocated, and now the pointer points to garbage:

ABA: next state

Now, the consumer wakes up. It tries to perform the compare-and-swap, and succeeds. This is because the pointer value of old_front is the same as the pointer value of m_queue_front — even though the identity of the node has changed!

After succeeding at the CAS, we set the queue’s front to point at what used to be the second node, but now points at garbage:

ABA: final broken state

Of course, now the queue is completely broken, plus our front pointer points at garbage memory!

3.2 Demo 2: Solving ABA with generation-counted pointers

There are several ways to solve the ABA problem, but is most commonly solved by including a “generation counter” alongside whatever data we’re performing the CAS on.

3.2.1 Explaining generation counters

How do generation counters work? The idea is that we tie a unique number together with the pointer, so that even if the address is the same, the value (which is a combination of both the pointer and the counter) is different.

We use a 64-bit integer for our counter; this means that, for us to get a false positive on the compare-and-swap (leading to the ABA problem), the following conditions must hold:

  1. a new m_queue_front was allocated at the same address as our old one
  2. 2^64 (18446744073709551616) pops have happened (causing the counter to overflow and wrap around) while our consumer was put to sleep

This is highly unlikely, mainly due to the second condition. So with very high probability, we can avoid the ABA problem by using generation counters.

3.2.2 Using generation counters

In our case, we just add a generation count next to the front pointer, like so:

struct alignas(16) GenNodePtr
{
    Node* node;
    uintptr_t gen;
};

static_assert(std::atomic<GenNodePtr>::is_always_lock_free);

alignas(64) std::atomic<Node*> m_queue_back;       // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end
Snippet 4: Adding a generation count to m_queue_front

Note that we don’t need to do this for the back pointer; since we are not using a compare-and-swap loop for push, we are not vulnerable to the ABA problem.

As for the rest of the queue, the implementation is similar to demo 1, just with a few changes to adapt to the generation-counted head pointer.

Godbolt link  |  Fsmbolt link

Code
// demo2.cpp

#include <atomic>
#include <optional>

struct Job
{
    int id;
    int data;
};

// (Incorrect)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// This design is incorrect as it (still) suffers from UAF.
class JobQueue2
{
    // alias for std::memory_order
    using stdmo = std::memory_order;

    // A node is a dummy node if its next pointer is set to QUEUE_END
    // We use the next ptr to establish the synchronizes-with relationship
    // next is in charge of "releasing" job
    struct Node
    {
        std::atomic<Node*> next = QUEUE_END;
        Job job;
    };

    static inline Node* const QUEUE_END = nullptr;

    struct alignas(16) GenNodePtr
    {
        Node* node;
        uintptr_t gen;
    };

    static_assert(std::atomic<GenNodePtr>::is_always_lock_free);

    alignas(64) std::atomic<Node*> m_queue_back;       // producer end
    alignas(64) std::atomic<GenNodePtr> m_queue_front; // consumer end

public:
    // Queue starts with a dummy node
    JobQueue2() //
        : m_queue_back(new Node())
        , m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
    {
    }

    ~JobQueue2()
    {
        // Assumption: no other threads are accessing the job queue
        Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
        while(cur_node != QUEUE_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }
    }

public:
    void push(Job job)
    {
        // same as the previous implementation
        Node* new_dummy = new Node();
        Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);

        work_node->job = job;
        work_node->next.store(new_dummy, stdmo::release);
    }

    std::optional<Job> try_pop()
    {
        // Splice node from the front of queue, but only if it's not dummy
        // Successfully splicing a node establishes global order of pops

        // Relaxed ordering because we don't need to synchronize with other
        // consumers, we need to synchronize with the producer that made the
        // node, which is done via the `next` ptr.

        GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
        while(true)
        {
            // this part is similar -- just need an additional `.node` get the
            // node out from the GenNodePtr
            Node* old_front_next = old_front.node->next.load(stdmo::acquire);
            if(old_front_next == QUEUE_END)
                return std::nullopt;

            // note that the generation is strictly increasing
            GenNodePtr new_front { old_front_next, old_front.gen + 1 };

            // this part is also similar, except we CAS with the GenNodePtr instead
            // of just a simple Node*.
            if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
            {
                // Node now belongs to us
                break;
            }
        }

        Job job = old_front.node->job;
        delete old_front.node;

        return job;
    }
};

#include <thread>

int main()
{
    JobQueue2 queue;

    auto producer1 = std::thread([&queue]() {
        for(int i = 1; i <= 150000; i++)
            queue.push(Job { i, i });
    });

    auto producer2 = std::thread([&queue]() {
        for(int i = 150001; i <= 300000; i++)
            queue.push(Job { i, i });
    });

    // 3 partial sums
    int sum1 = 0;
    int sum2 = 0;
    int sum3 = 0;

    auto consumer_fn = [&queue](int& sum) {
        // Sum 100000 things
        for(int i = 0; i < 100000; i++)
        {
            while(true)
            {
                std::optional<Job> job = queue.try_pop();
                if(job)
                {
                    sum += job->id;
                    break;
                }
            }
        }
    };

    auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
    auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
    auto consumer3 = std::thread(consumer_fn, std::ref(sum3));

    producer1.join();
    producer2.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();

    printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}
Snippet 5: Our second attempt at a lock-free queue

Let’s take a look at the try_pop function to see how the generation counters are used:

GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
while(true)
{
    // this part is similar -- just need an additional `.node` get the
    // node out from the GenNodePtr
    Node* old_front_next = old_front.node->next.load(stdmo::acquire);
    if(old_front_next == QUEUE_END)
        return std::nullopt;

    // note that the generation is strictly increasing
    GenNodePtr new_front { old_front_next, old_front.gen + 1 };

    // this part is also similar, except we CAS with the GenNodePtr instead
    // of just a simple Node*.
    if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
    {
        // Node now belongs to us
        break;
    }
}
Snippet 6: The try_pop method

We’re now loading a GenNodePtr when we get the old front, and we’re also using it to do the compare-and-swap. The most important thing here is that the generation of our new_front (if we succeeded at the CAS) is “newer” than the old front.

As explained above, the use of generation counters means that we shouldn’t get an ABA problem any more. So now, we should be good right? Let’s run it with ThreadSanitizer…

$ ./demo2.tsan
WARNING: ThreadSanitizer: heap-use-after-free (pid=1889172)
  Atomic read of size 8 at 0x7b0400002800 by thread T4:
    <...>

  Previous write of size 8 at 0x7b0400002800 by thread T3:
    <...>

Ouch. We still have races, because we still have another problem!

Extra: lock-free-ness of GenNodePtr

How can we be sure that our GenNodePtr is actually lock-free? The keen-eyed among you might have already noticed that std::atomic<T> doesn’t actually make any guarantees about being lock-free, only that it is atomic.

There’s two ways we can tell: the is_lock_free method on an instance, and the static data member is_always_lock_free. The reason there’s two is because a given object might only be atomic if aligned suitably, and the alignment can be runtime-dependent.

If we want to know whether a type is always lock free, then we can use std::atomic<T>::is_always_lock_free — this is true if the type is always lock-free, regardless of its alignment. We used a static_assert on our GenNodePtr to make sure.

16-byte atomic compare-and-swap is done with the cmpxchg16b instruction on x86_64. This isn’t supported by some very old x86_64 CPUs, so note that we had to pass -march=native to the compiler to ensure that it uses this instruction; otherwise, our static assertion will fail.

4 Problem #2: use-after-free (UAF)

While we won’t succeed at performing a CAS when other consumers have changed the queue, have changed the queue, we’re still dereferencing a pointer from a possibly stale value of m_queue_front (which we stored as old_front).

This can cause data races as it’s possible a new node is allocated again on the same address (regardless of generation!), and now we can have the constructor of std::atomic race with a use of the same object with .load().

Solutions to this problem generally fall under a few classes:

  1. Never free anything (ie. just leak all the memory).
  2. Mark nodes for deletion while there are still threads in try_pop, and when the last one leaves, we free all of them at once.
  3. Use reference counting (ie. an atomic shared_ptr) to know when there are no more remaining references to a particular object.
  4. Use hazard pointers to track which threads have references to which objects.

We’ll go with a variant of solution (2). Rather than trying to delete when the last try_pop leaves, which can be very rare, we simply won’t try to delete nodes at all while the queue is alive, but store them somewhere so that we can delete them all at once when the queue is being destructed.

However, this would still allow a memory leak to occur, and our memory usage can keep increasing as the queue is used more. To avoid this, we’ll reuse nodes that were deleted and put them back in the queue when we need new nodes.

This essentially functions as a “free list” of nodes that we recycle for future use. We only allocate new nodes from the heap when our recycling centre has been exhausted. Note that this means that the peak usage of our queue can still be quite high, and if the queue is very long for only a short time, we’ll keep those recycled nodes around doing nothing.

However, this is still a relatively simple solution, so we’ll stick with it.

What about atomic shared_ptr?

The keen-eyed among you might have noticed that the C++ standard library actually has a specialisation of std::atomic for shared_ptr. So, why don’t we use it?

Well, the problem is that it’s not lock-free. None of the 3 major implementations of the STL (libc++, libstdc++, and MSVC’s STL) have a lock-free implementation of this — it just uses mutexes under the hood. Note that nowhere in the specification does it say that std::atomic must actually be lock-free!

(Only std::atomic_flag is guaranteed to be lock-free).

Since we’re interested in building a specifically lock-free queue, we opted not to use this. There’s a few CppCon talks about atomic shared pointers you can watch, if you’re interested:

  1. Timur Doumler: A Lock-Free Atomic Shared Pointer in Modern Cpp
  2. Timur Doumler: same as above, but longer and at a different conference
  3. Daniel Anderson: a slightly different approach

4.1 Implementing the Recycling Centre

As environmentally conscious students, we should recycle whenever possible. Since we don’t care about the order that recycled nodes are used, we can just use a stack for it, rather than a queue. However, since we still want the queue overall to be lock-free, our recycling centre also needs to be lock-free!

Yes, our concurrent queue implementation has a concurrent stack implementation hidden inside (:

First, for the recycling node stack, we just need a single pointer to keep track of the top of the stack:

// we use these as sentinel values.
static inline Node* const QUEUE_END = nullptr;
static inline Node* const STACK_END = QUEUE_END + 1;

struct alignas(16) GenNodePtr
{
    Node* node;
    uintptr_t gen;
};

static_assert(std::atomic<GenNodePtr>::is_always_lock_free);

alignas(64) std::atomic<Node*> m_queue_back;              // producer end
alignas(64) std::atomic<GenNodePtr> m_queue_front;        // consumer end
alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack
Snippet 7: The extra field we need for the recycling stack

As you can see, most of it is the same. We added a new sentinel value to use as the “empty stack” value. Before we forget, we still need to clean up the stack in the queue’s destructor, as promised. It’s relatively simple:

// we need to clean up the recycled nodes as well
cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
while(cur_node != STACK_END)
{
    Node* next = cur_node->next;
    delete cur_node;

    cur_node = next;
}
Snippet 8: Walking the recycling stack to clean up nodes

Now for the fun part, which is allocating a new node (or getting one from our recycling stack):

// either get a node from the recycling stack if we have some,
// or allocate a new one if we don't.
Node* get_recycled_node_or_allocate_new()
{
    GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
    while(true)
    {
        // if we have no more recycled nodes, return a newly-allocated one.
        if(old_stack_top.node == STACK_END)
            return new Node();

        Node* cur_stack_next = old_stack_top.node->next.load(stdmo::relaxed);
        GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };

        if(m_recycled_stack_top.compare_exchange_weak( //
               old_stack_top,                          //
               new_stack_top,                          //
               stdmo::relaxed))
        {
            // successfully got a node from the recycling centre
            return old_stack_top.node;
        }
    }
}
Snippet 9: This function has a very descriptive name

Next, we need a way to put things into this recycling stack when we pop nodes from our queue. The implementation is very similar to our classic CAS-loop-with-generation-counter pattern:

// Put node in recycling centre
void add_node_to_recycling_stack(Node* node)
{
    // Standard CAS loop with generation counter to avoid ABA.
    GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
    while(true)
    {
        node->next.store(old_stack_top.node, stdmo::relaxed);
        GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };

        if(m_recycled_stack_top.compare_exchange_weak( //
               old_stack_top,                          //
               new_stack_top,                          //
               stdmo::relaxed))
        {
            break;
        }
    }
}
Snippet 10: This function also has a very descriptive name

4.2 Using the recycling centre

All that’s left is to use these functions when we create and delete nodes (instead of calling new and delete directly). That’s fairly straightforward:

void push(Job job)
{
    Node* new_dummy = get_recycled_node_or_allocate_new();
    new_dummy->next.store(QUEUE_END, stdmo::relaxed);
    // the rest is the same...
}

Note that for push, we have to explicitly reset the next pointer to our sentinel, because it might have been recycled and have stale data inside.

std::optional<Job> try_pop()
{
    // most of it is the same...
    // only this part changes
        Job job = old_front.node->job;
        add_node_to_recycling_stack(old_front.node);

        return job;
    }

A question you might want to ask is: can we still get a use-after-free situation?

Answer

No, we can’t. By definition, we never really free nodes, only add them to the recycling centre. Thus, we will never end up dereferencing a deallocated node, which was our UAF problem in the first place.

We might end up checking the ->next of a node that’s already been recycled, but that’s fine — it simply won’t match with what we see in the queue, and we’ll retry our CAS loop.

4.3 Demo 3: Solving UAF with recycling

In our recycling stack implementation, we’ve used memory_order_relaxed, since the nodes in the stack don’t contain any data other than the next pointer. We can imagine that threads only need to observe m_recycling_stack_top, and that atomicity gives us the guarantees we need.

Godbolt link  |  Fsmbolt link

Code
// demo3.cpp

#include <atomic>
#include <optional>

struct Job
{
    int id;
    int data;
};

// (has subtle bug)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// Avoids UAF by reusing nodes instead of deallocating them
class JobQueue3
{
    // alias for std::memory_order
    using stdmo = std::memory_order;

    // A node is a dummy node if its next pointer is set to QUEUE_END
    // We use the next ptr to establish the synchronizes-with relationship
    // next is in charge of "releasing" job
    struct Node
    {
        std::atomic<Node*> next = QUEUE_END;
        Job job;
    };

    // we use these as sentinel values.
    static inline Node* const QUEUE_END = nullptr;
    static inline Node* const STACK_END = QUEUE_END + 1;

    struct alignas(16) GenNodePtr
    {
        Node* node;
        uintptr_t gen;
    };

    static_assert(std::atomic<GenNodePtr>::is_always_lock_free);

    alignas(64) std::atomic<Node*> m_queue_back;              // producer end
    alignas(64) std::atomic<GenNodePtr> m_queue_front;        // consumer end
    alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack

public:
    // Queue starts with a dummy node
    JobQueue3() //
        : m_queue_back(new Node())
        , m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
        , m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
    {
    }

    ~JobQueue3()
    {
        // Assumption: no other threads are accessing the job queue
        Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
        while(cur_node != QUEUE_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }

        // we need to clean up the recycled nodes as well
        cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
        while(cur_node != STACK_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }
    }

    // either get a node from the recycling stack if we have some,
    // or allocate a new one if we don't.
    Node* get_recycled_node_or_allocate_new()
    {
        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
        while(true)
        {
            // if we have no more recycled nodes, return a newly-allocated one.
            if(old_stack_top.node == STACK_END)
                return new Node();

            Node* cur_stack_next = old_stack_top.node->next.load(stdmo::relaxed);
            GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };

            if(m_recycled_stack_top.compare_exchange_weak( //
                   old_stack_top,                          //
                   new_stack_top,                          //
                   stdmo::relaxed))
            {
                // successfully got a node from the recycling centre
                return old_stack_top.node;
            }
        }
    }

    // Put node in recycling centre
    void add_node_to_recycling_stack(Node* node)
    {
        // Standard CAS loop with generation counter to avoid ABA.
        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
        while(true)
        {
            node->next.store(old_stack_top.node, stdmo::relaxed);
            GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };

            if(m_recycled_stack_top.compare_exchange_weak( //
                   old_stack_top,                          //
                   new_stack_top,                          //
                   stdmo::relaxed))
            {
                break;
            }
        }
    }


public:
    void push(Job job)
    {
        Node* new_dummy = get_recycled_node_or_allocate_new();
        new_dummy->next.store(QUEUE_END, stdmo::relaxed);
        // the rest is the same...

        Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);

        work_node->job = job;
        work_node->next.store(new_dummy, stdmo::release);
    }

    std::optional<Job> try_pop()
    {
        // most of it is the same...

        // Splice node from the front of queue, but only if it's not dummy
        // Successfully splicing a node establishes global order of pops

        // Relaxed ordering because we don't need to synchronize with other
        // consumers, we need to synchronize with the producer that made the
        // node, which is done via the `next` ptr.

        GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
        while(true)
        {
            // this part is similar -- just need an additional `.node` get the
            // node out from the GenNodePtr
            Node* old_front_next = old_front.node->next.load(stdmo::acquire);
            if(old_front_next == QUEUE_END)
                return std::nullopt;

            // note that the generation is strictly increasing
            GenNodePtr new_front { old_front_next, old_front.gen + 1 };

            // this part is also similar, except we CAS with the GenNodePtr instead
            // of just a simple Node*.
            if(m_queue_front.compare_exchange_weak(old_front, new_front, stdmo::relaxed))
            {
                // Node now belongs to us
                break;
            }
        }

        // only this part changes
        Job job = old_front.node->job;
        add_node_to_recycling_stack(old_front.node);

        return job;
    }
};

#include <thread>

int main()
{
    JobQueue3 queue;

    auto producer1 = std::thread([&queue]() {
        for(int i = 1; i <= 150000; i++)
            queue.push(Job { i, i });
    });

    auto producer2 = std::thread([&queue]() {
        for(int i = 150001; i <= 300000; i++)
            queue.push(Job { i, i });
    });

    // 3 partial sums
    int sum1 = 0;
    int sum2 = 0;
    int sum3 = 0;

    auto consumer_fn = [&queue](int& sum) {
        // Sum 100000 things
        for(int i = 0; i < 100000; i++)
        {
            while(true)
            {
                std::optional<Job> job = queue.try_pop();
                if(job)
                {
                    sum += job->id;
                    break;
                }
            }
        }
    };

    auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
    auto consumer2 = std::thread(consumer_fn, std::ref(sum2));
    auto consumer3 = std::thread(consumer_fn, std::ref(sum3));

    producer1.join();
    producer2.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();

    printf("Sum of 1 to 300000 modulo integer limit: %d\n", sum1 + sum2 + sum3);
}

5 Problem #3: Data race in recycling stack

Unfortunately, our environmentally-friendly recycling centre has a data race:

==================
WARNING: ThreadSanitizer: data race (pid=3978284)
  Write of size 8 at 0x7b0400000008 by thread T1:
     #0 JobQueue11B::push(Job) demo3.cpp:134 (demo3.tsan+0xe163d)
     <...>

  Previous read of size 8 at 0x7b0400000008 by thread T3:
     #0 JobQueue11B::try_pop() demo31.cpp:171 (demo3.tsan+0xe0ecd)
     <...>

That corresponds to these two pieces of code:

work_node->job = job;
Job job = old_front.node->job;

Take careful note of the ordering of the race here. According to ThreadSanitizer, we tried to read the job first, and then perform the write. While it’s true that when a data race occurs, it’s hard to say which one happened “first”, the order that ThreadSanitizer presents is useful to keep in mind as it’s usually the more intuitive ordering of events.

In this ordering, it would suggest that the read in try_pop can potentially see a FUTURE value, written by a subsequent push. This is now possible because of node recycling!

To make this race more obvious, let’s draw out our favourite synchronisation diagram:

The synchronisation diagram for our race

As we can indeed see, there’s no synchronisation between loading the job in T1 and storing it in T3.

5.1 Demo 4: Correct queue implementation

To resolve this, we can make T1 synchronise-with T2 during the load/store of node->next. The data race happens when the node that T1 adds to the recycling stack is the one that T2 takes out. Thus, if T2 does indeed take out node X, it would have read the value stored by T1. By using release-acquire here, we would have established a synchronises-with relationship:

The synchronisation diagram for our fixed implementation

With the transitive nature of happens-before, we now have a correct synchronisation between T1 and T3. To do this, we just need to change a few lines of code:

Node* get_recycled_node_or_allocate_new()
{
    GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
    while(true)
    {
        if(old_stack_top.node == STACK_END)
        {
                return new Node();
            }

            // here: use **acquire**. synchronise with the release-store of
            // node->next in `add_node_to_recycling_stack`
            Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
            // the rest is the same...
    }
}
void add_node_to_recycling_stack(Node* node)
{
    GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
    while(true)
    {
        // here: use **release**. synchronise with the acquire-load of
        // node->next in `get_recycled_node_or_allocate_new`
        node->next.store(old_stack_top.node, stdmo::release);
        // the rest is the same...
    }
}

If we run our fixed implementation, we are finally free of ThreadSanitizer errors!

Godbolt link  |  Fsmbolt link

Code
// demo4.cpp

#include <atomic>
#include <optional>

struct Job
{
    int id;
    int data;
};

// (is [not quite] correct)
// Unbounded lock free queue with push and non-blocking try_pop
//
// Avoids ABA using generation counter
// Avoids UAF by reusing nodes instead of deallocating them
class JobQueue4
{
    // alias for std::memory_order
    using stdmo = std::memory_order;

    // A node is a dummy node if its next pointer is set to QUEUE_END
    // We use the next ptr to establish the synchronizes-with relationship
    // next is in charge of "releasing" job
    struct Node
    {
        std::atomic<Node*> next = QUEUE_END;
        Job job;
    };

    // we use these as sentinel values.
    static inline Node* const QUEUE_END = nullptr;
    static inline Node* const STACK_END = QUEUE_END + 1;

    struct alignas(16) GenNodePtr
    {
        Node* node;
        uintptr_t gen;
    };

    static_assert(std::atomic<GenNodePtr>::is_always_lock_free);

    alignas(64) std::atomic<Node*> m_queue_back;              // producer end
    alignas(64) std::atomic<GenNodePtr> m_queue_front;        // consumer end
    alignas(64) std::atomic<GenNodePtr> m_recycled_stack_top; // recycled node stack

public:
    // Queue starts with a dummy node
    JobQueue4() //
        : m_queue_back(new Node())
        , m_queue_front(GenNodePtr { m_queue_back.load(stdmo::relaxed), 1 })
        , m_recycled_stack_top(GenNodePtr { STACK_END, 1 })
    {
    }

    ~JobQueue4()
    {
        // Assumption: no other threads are accessing the job queue
        Node* cur_node = m_queue_front.load(stdmo::relaxed).node;
        while(cur_node != QUEUE_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }

        // we need to clean up the recycled nodes as well
        cur_node = m_recycled_stack_top.load(stdmo::relaxed).node;
        while(cur_node != STACK_END)
        {
            Node* next = cur_node->next;
            delete cur_node;

            cur_node = next;
        }
    }

    // either get a node from the recycling stack if we have some,
    // or allocate a new one if we don't.
    Node* get_recycled_node_or_allocate_new()
    {
        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
        while(true)
        {
            if(old_stack_top.node == STACK_END)
            {
                return new Node();
            }

            // here: use **acquire**. synchronise with the release-store of
            // node->next in `add_node_to_recycling_stack`
            Node* cur_stack_next = old_stack_top.node->next.load(stdmo::acquire);
            // the rest is the same...

            GenNodePtr new_stack_top { cur_stack_next, old_stack_top.gen + 1 };

            if(m_recycled_stack_top.compare_exchange_weak( //
                   old_stack_top,                          //
                   new_stack_top,                          //
                   stdmo::relaxed))
            {
                // successfully got a node from the recycling centre
                return old_stack_top.node;
            }
        }
    }

    // Put node in recycling centre
    void add_node_to_recycling_stack(Node* node)
    {
        GenNodePtr old_stack_top = m_recycled_stack_top.load(stdmo::relaxed);
        while(true)
        {
            // here: use **release**. synchronise with the acquire-load of
            // node->next in `get_recycled_node_or_allocate_new`
            node->next.store(old_stack_top.node, stdmo::release);
            // the rest is the same...
            GenNodePtr new_stack_top { node, old_stack_top.gen + 1 };

            if(m_recycled_stack_top.compare_exchange_weak( //
                   old_stack_top,                          //
                   new_stack_top,                          //
                   stdmo::relaxed))
            {
                break;
            }
        }
    }


public:
    void push(Job job)
    {
        Node* new_dummy = get_recycled_node_or_allocate_new();
        new_dummy->next.store(QUEUE_END, stdmo::relaxed);

        Node* work_node = m_queue_back.exchange(new_dummy, stdmo::acq_rel);

        work_node->job = job;
        work_node->next.store(new_dummy, stdmo::release);
    }

    std::optional<Job> try_pop()
    {
        // most of it is the same...

        // Splice node from the front of queue, but only if it's not dummy
        // Successfully splicing a node establishes global order of pops

        // Relaxed ordering because we don't need to synchronize with other
        // consumers, we need to synchronize with the producer that made the
        // node, which is done via the `next` ptr.

        GenNodePtr old_front = m_queue_front.load(stdmo::relaxed);
        while(true)
        {
            // this part is similar -- just need an additional `.node` get the
            // node out from the GenNodePtr
            Node* old_front_next = old_front.node->next.load(stdmo::acquire);
            if(old_front_next == QUEUE_END)
                return std::nullopt;

            // note that the generation is strictly increasing
            GenNodePtr new_front { old_front_next, old_front.gen + 1 };

            // this part is also similar, except we CAS with the GenNodePtr instead
            // of just a simple Node*.
            if(m_queue_front.compare_exchange_weak(old_front, //
                new_front, stdmo::relaxed))
            {
                // Node now belongs to us
                break;
            }
        }

        Job job = old_front.node->job;
        add_node_to_recycling_stack(old_front.node);

        return job;
    }
};

#include <thread>
#include <barrier>

int main()
{
    JobQueue4 queue;
    auto barrier = std::barrier(3);

    auto producer1 = std::thread([&queue, &barrier]() {
        barrier.arrive_and_wait();
        for(int i = 1; i <= 200000; i++)
        {
            queue.push(Job { i, i });
            std::this_thread::yield();
        }
    });

    // 3 partial sums
    int sum1 = 0;
    int sum2 = 0;

    auto consumer_fn = [&queue, &barrier](int& sum) {
        barrier.arrive_and_wait();
        for(int i = 0; i < 100000; i++)
        {
            while(true)
            {
                std::optional<Job> job = queue.try_pop();
                if(job)
                {
                    sum += job->id;
                    break;
                }
            }
        }
    };

    auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
    auto consumer2 = std::thread(consumer_fn, std::ref(sum2));

    producer1.join();
    consumer1.join();
    consumer2.join();

    printf("magic number: %d\n", sum1 + sum2);
}

6 Problem #4: Internal data race

Just kidding! If we run with the modified test code below that forces the producer thread to yield after pushing every item (allowing consumers to “overtake” it):

Godbolt link  |  Fsmbolt link

Code
#include <thread>
#include <barrier>

int main()
{
    JobQueue4 queue;
    auto barrier = std::barrier(3);

    auto producer1 = std::thread([&queue, &barrier]() {
        barrier.arrive_and_wait();
        for(int i = 1; i <= 200000; i++)
        {
            queue.push(Job { i, i });
            std::this_thread::yield();
        }
    });

    // 3 partial sums
    int sum1 = 0;
    int sum2 = 0;

    auto consumer_fn = [&queue, &barrier](int& sum) {
        barrier.arrive_and_wait();
        for(int i = 0; i < 100000; i++)
        {
            while(true)
            {
                std::optional<Job> job = queue.try_pop();
                if(job)
                {
                    sum += job->id;
                    break;
                }
            }
        }
    };

    auto consumer1 = std::thread(consumer_fn, std::ref(sum1));
    auto consumer2 = std::thread(consumer_fn, std::ref(sum2));

    producer1.join();
    consumer1.join();
    consumer2.join();

    printf("magic number: %d\n", sum1 + sum2);
}

Then we actually get another race! If we look at the line numbers that ThreadSanitizer complains about, we get a race.

This is what TSan claims is the “writer”:

return new Node();

And this is what it claims is the “reader”:

Node* old_front_next = old_front.node->next.load(stdmo::acquire);

What’s the race here? If we draw a synchronisation diagram, we can see the following:

The synchronisation diagram for this new race

There is indeed no synchronise-with relationship between T1 and T3. This means that we could potentially get a race between the reader on the right and the writer on the left. Even though this might seem like it violates causality (and/or involves time travel), it is still a data race.

6.1 Demo 5: The final fix

In order to fix this, we need to find a way to synchronise T2 and T3, so that T1 will synchronise-with T3 as well. We can do this by making the compare-exchange on the front-of-queue be acquire-release instead, and the loading of the old_front be an acquire load:

The synchronisation diagram for our fixed implementation

This corresponds to the following changes in code:

Godbolt link  |  Fsmbolt link

Code
std::optional<Job> try_pop()
{
    // most of it is the same...
    // but we changed this load to acquire
    GenNodePtr old_front = m_queue_front.load(stdmo::acquire);
    while(true)
    {
        Node* old_front_next = old_front.node->next.load(stdmo::acquire);
        if(old_front_next == QUEUE_END)
            return std::nullopt;

        GenNodePtr new_front { old_front_next, old_front.gen + 1 };

        // and we change this CAS to be acquire-release
        if(m_queue_front.compare_exchange_weak(old_front, //
            new_front, stdmo::acq_rel))
        {
            break;
        }
    }

    Job job = old_front.node->job;
    add_node_to_recycling_stack(old_front.node);

    return job;
}

Now, if we run it… we should finally, actually, hopefully, have a correct, working implementation of a lock-free concurrent queue.

7 Queue Benchmarks

Now that we have seen both a fine-grained-lock queue and a lock-free queue, how else can we learn which queue a situation calls for, if not through benchmarks!

The specific form of benchmarking we will be looking at is microbenchmarking; microbenchmarks track the performance of a single well-defined task, and is most useful for CPU work that is run many times (also known as hot code paths).

In this section, we will be benchmarking 3 concurrent queue implementations:

  1. a basic coarse-grained-lock queue
  2. the fine-grained-lock queue from Lecture 5
  3. the lock-free queue we just wrote
Coarse-grained-lock queue implementation
// locked-coarse-queue.cpp

#include <queue>
#include <mutex>

struct Job
{
    int id;
    int data;
};

// Unbounded queue with push and non-blocking try_pop
class CoarseLockedJobQueue
{
    std::queue<Job> jobs;
    std::mutex mut;

public:
    CoarseLockedJobQueue() { }

    void push(Job job)
    {
        std::unique_lock lock { mut };

        jobs.push(job);
    }

    std::optional<Job> try_pop()
    {
        std::unique_lock lock { mut };

        if(jobs.empty())
        {
            return std::nullopt;
        }
        else
        {
            Job job = jobs.front();
            jobs.pop();

            return job;
        }
    }
};
Fine-grained-lock queue implementation
// locked-fine-queue.cpp

#include <mutex>

struct Job
{
    int id;
    int data;
};

// Unbounded queue with push and non-blocking pop
// Use fine grained locking to decouple producers and consumers, though we
// no longer have blocking pop
class FineLockedJobQueue
{
    // A node can be a dummy node (contains no jobs and next == nullptr) or
    // it can be a regular node (contains a job and next != nullptr)
    // The last node in the queue (node at producer end) is always the dummy
    // node, all other nodes are regular nodes
    struct Node
    {
        std::mutex mut {};
        Node* next = nullptr;
        Job job {};
    };

    alignas(64) std::mutex mut_back;
    Node* jobs_back; // producer end

    alignas(64) std::mutex mut_front;
    Node* jobs_front; // consumer end

public:
    // Queue starts with a dummy node
    FineLockedJobQueue() //
        : mut_back()
        , jobs_back(new Node())
        , mut_front()
        , jobs_front(jobs_back)
    {
    }

    ~FineLockedJobQueue()
    {
        // Assumption: no other threads are accessing the job queue
        while(jobs_front != nullptr)
        {
            Node* next = jobs_front->next;
            delete jobs_front;
            jobs_front = next;
        }
    }

    void push(Job job)
    {
        // Make a new dummy node
        Node* new_node = new Node {};

        std::unique_lock lock { mut_back };
        // Turn old dummy node into regular node
        std::unique_lock lock_node { jobs_back->mut };

        jobs_back->job = job;
        jobs_back->next = new_node;
        jobs_back = new_node;
    }

    std::optional<Job> try_pop()
    {
        Node* old_node;
        {
            std::unique_lock lock { mut_front };
            old_node = jobs_front;

            std::unique_lock lock_node { old_node->mut };
            if(old_node->next == nullptr)
            {
                // Node was dummy, so the queue is empty
                return std::nullopt;
            }

            // Node was not dummy, delete node and return job
            jobs_front = jobs_front->next;
        }

        Job job = old_node->job;
        delete old_node;

        return job;
    }
};

7.1 Benchmark setup and metrics

When benchmarking a piece of code, we first decide the metric by which we evaluate our code. Some common examples include CPU cycles, MFLOPS, and real time (wall-clock-time taken from the start to the end of a program).

To understand how each queue scales under contention with a different number of producers and consumers, we will measuring the real time taken for a few different setups for each queue:

7.2 Benchmarking code

In this setup, we use a std::barrier to ensure all producers and consumers threads have reached the start of the benchmark loop before starting the timer and commencing execution. When all threads have finished executing, we end the timer. Within the loop, we busily spin on a variable in between enqueues/dequeues to simulate work done on the threads in an effort to emulate real world usage.

Try running the code below on Fsmbolt (or locally if possible!) and observe how each queue performs in different setups. Note that the execution time is about 30 seconds so please be patient.

Godbolt link  |  Fsmbolt link

Benchmark function
template <typename JobQueueImpl,
    std::ptrdiff_t ProducerCount,
    std::ptrdiff_t ConsumerCount>
sc::microseconds benchmark_prod_con()
{
    static_assert( //
        ((static_cast<int>(ProducerCount) * loop_iters) % ConsumerCount) == 0,
        "Invalid total thread count");
    JobQueueImpl queue;

    // we use the main thread to keep track of the time
    auto thread_count = ProducerCount + ConsumerCount + 1;
    auto barrier = std::barrier(thread_count);

    auto producer_func = [&queue, &barrier](int id) {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> distrib(5, 15);
        barrier.arrive_and_wait();
        for(int i = 0; i < loop_iters; ++i)
        {
            queue.push(Job { id, i });
#ifndef HIGH_CONTENTION
            int work_cycles = distrib(gen);
            while(--work_cycles)
                __asm__("nop");
#endif
        }
    };

    auto consumer_iter = (ProducerCount * loop_iters) / ConsumerCount;
    auto consumer_func = [&queue, &barrier, consumer_iter]() {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> distrib(5, 15);
        barrier.arrive_and_wait();
        for(int i = 0; i < consumer_iter; ++i)
        {
            auto job = queue.try_pop();
            while(!job)
            {
                __asm__("pause");
                job = queue.try_pop();
            }
            do_something(*job);
            ;
#ifndef HIGH_CONTENTION
            int work_cycles = distrib(gen);
            while(--work_cycles)
                __asm__("nop");
#endif
        }
    };

    std::vector<std::thread> producers;
    for(size_t i = 1; i <= ProducerCount; ++i)
        producers.emplace_back(producer_func, i);

    std::vector<std::thread> consumers;
    for(size_t i = 1; i <= ConsumerCount; ++i)
        consumers.emplace_back(consumer_func);

    auto start = sc::high_resolution_clock::now();
    barrier.arrive_and_wait();

    for(size_t i = 0; i < ProducerCount; ++i)
        producers[i].join();

    for(size_t i = 0; i < ConsumerCount; ++i)
        consumers[i].join();

    auto stop = sc::high_resolution_clock::now();

    return sc::duration_cast<sc::microseconds>(stop - start);
}
Benchmark Results

Running with simulated work between consecutive enqueue/dequeues
SPSC benchmarks:
SPSC CoarseLockedJobQueue took: 260130µs
SPSC FineLockedJobQueue took: 233000µs
SPSC LockFreeJobQueue took: 266310µs

SPMC benchmarks:
1P2C CoarseLockedJobQueue took: 251312µs
1P2C FineLockedJobQueue took: 367206µs
1P2C LockFreeJobQueue took: 296053µs

1P4C CoarseLockedJobQueue took: 726801µs
1P4C FineLockedJobQueue took: 365763µs
1P4C LockFreeJobQueue took: 335175µs

MPSC benchmarks:
2P1C CoarseLockedJobQueue took: 324418µs
2P1C FineLockedJobQueue took: 958496µs
2P1C LockFreeJobQueue took: 429071µs

4P1C CoarseLockedJobQueue took: 867296µs
4P1C FineLockedJobQueue took: 2915416µs
4P1C LockFreeJobQueue took: 771916µs

MPMC benchmarks:
2P2C CoarseLockedJobQueue took: 484619µs
2P2C FineLockedJobQueue took: 698458µs
2P2C LockFreeJobQueue took: 604591µs

4P4C CoarseLockedJobQueue took: 1577709µs
4P4C FineLockedJobQueue took: 1466190µs
4P4C LockFreeJobQueue took: 1175500µs

8P8C CoarseLockedJobQueue took: 3701999µs
8P8C FineLockedJobQueue took: 3390677µs
8P8C LockFreeJobQueue took: 3067856µs
Extras 1 To view the effects of high contention on the queue, add the compilation flag -DHIGH_CONTENTION. This removes the simulated work between enqueue/dequeues and increase the contention on both ends of the queue. Observe how the performance of the lock free queue degrades.
Extras 2

In real world use cases, the access patterns by different threads may differ. In the godbolt links above, we have included another another benchmark where each thread:

  1. Performs an enqueue
  2. Simulates some work
  3. Performs a dequeue
  4. Simulates some work

Add the compilation flag -DED_PAIR.

Observe how the performance of the lock free queue scales against the other queues.

© CS3211 Teaching Team, All Rights Reserved

^

dummy